/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.dataflow.worker.windmill.client;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.TerminatingStreamObserver;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.slf4j.Logger;

/**
 * Request observer that allows resetting its internal delegate using a {@link
 * #streamObserverFactory}.
 *
 * @implNote {@link StreamObserver}s generated by {@link #streamObserverFactory} are expected to be
 *     {@link ThreadSafe}. Has same methods declared in {@link StreamObserver}, but they throw
 *     {@link StreamClosedException} and {@link WindmillStreamShutdownException}, which much be
 *     handled by callers.
 */
@ThreadSafe
@Internal
final class ResettableThrowingStreamObserver<T> {

  private final Logger logger;

  @GuardedBy("this")
  private @Nullable TerminatingStreamObserver<T> delegateStreamObserver;

  @GuardedBy("this")
  private boolean isPoisoned = false;

  /**
   * Indicates that the current delegate is closed via {@link #poison() or {@link #onCompleted()}}.
   * If not poisoned, a call to {@link #reset()} is required to perform future operations on the
   * StreamObserver.
   */
  @GuardedBy("this")
  private boolean isCurrentStreamClosed = true;

  ResettableThrowingStreamObserver(Logger logger) {
    this.logger = logger;
    this.delegateStreamObserver = null;
  }

  private synchronized StreamObserver<T> delegate()
      throws WindmillStreamShutdownException, StreamClosedException {
    if (isPoisoned) {
      throw new WindmillStreamShutdownException("Stream is already shutdown.");
    }

    if (isCurrentStreamClosed) {
      throw new StreamClosedException(
          "Current stream is closed, requires reset() for future stream operations.");
    }

    return Preconditions.checkNotNull(delegateStreamObserver, "requestObserver cannot be null.");
  }

  /** Creates a new delegate to use for future {@link StreamObserver} methods. */
  synchronized void reset(TerminatingStreamObserver<T> observer)
      throws WindmillStreamShutdownException {
    if (isPoisoned) {
      throw new WindmillStreamShutdownException("Stream is already shutdown.");
    }

    delegateStreamObserver = observer;
    isCurrentStreamClosed = false;
  }

  /**
   * Indicates that the request observer should no longer be used. Attempts to perform operations on
   * the request observer will throw an {@link WindmillStreamShutdownException}.
   */
  synchronized void poison() {
    if (!isPoisoned) {
      isPoisoned = true;
      if (delegateStreamObserver != null) {
        delegateStreamObserver.terminate(
            new WindmillStreamShutdownException("Explicit call to shutdown stream."));
        delegateStreamObserver = null;
        isCurrentStreamClosed = true;
      }
    }
  }

  public void onNext(T t) throws StreamClosedException, WindmillStreamShutdownException {
    // Make sure onNext and onError below to be called on the same StreamObserver instance.
    StreamObserver<T> delegate = delegate();
    try {
      // Do NOT lock while sending message over the stream as this will block other StreamObserver
      // operations.
      delegate.onNext(t);
    } catch (StreamObserverCancelledException cancellationException) {
      synchronized (this) {
        if (isPoisoned) {
          logger.debug("Stream was shutdown during send.", cancellationException);
          return;
        }
        if (delegateStreamObserver == delegate) {
          if (isCurrentStreamClosed) {
            logger.debug("Stream is already closed when encountering error with send.");
            return;
          }
          isCurrentStreamClosed = true;
        }
      }

      // Either this was the active observer the current observer that requires closing, or this was
      // a previous
      // observer which we attempt to close and ignore possible exceptions.
      try {
        delegate.onError(cancellationException);
      } catch (IllegalStateException onErrorException) {
        // The delegate above was already terminated via onError or onComplete.
        // Fallthrough since this is possibly due to queued onNext() calls that are being made
        // from previously blocked threads.
      } catch (RuntimeException onErrorException) {
        logger.warn(
            "Encountered unexpected error {} when cancelling due to error.",
            onErrorException,
            cancellationException);
      }
    }
  }

  public synchronized void onError(Throwable throwable)
      throws StreamClosedException, WindmillStreamShutdownException {
    try {
      delegate().onError(throwable);
    } finally {
      isCurrentStreamClosed = true;
    }
  }

  public synchronized void onCompleted()
      throws StreamClosedException, WindmillStreamShutdownException {
    try {
      delegate().onCompleted();
    } finally {
      isCurrentStreamClosed = true;
    }
  }

  synchronized boolean isClosed() {
    return isCurrentStreamClosed;
  }

  /**
   * Indicates that the current stream was closed and the {@link StreamObserver} has finished via
   * {@link StreamObserver#onCompleted()}. The stream may perform
   */
  static final class StreamClosedException extends Exception {
    StreamClosedException(String s) {
      super(s);
    }
  }
}
